package pub.ryan.dw.pub.ryan.dw.idmp

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import pub.ryan.commons.util.SparkUtil

import scala.collection.immutable

//埋点日志id映射：电脑、手机、微信日志
//user->uid
//user->phone->imei/mac/imsi/androidId/deviceId/uuid
object LogDataidmp {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkUtil.getSparkSession()
    //导隐式转换
    import sparkSession.implicits._
    //获取三类数据
    val weblog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-11\\web")
    val applog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-11\\app")
    val wxapplog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-11\\wxapp")
    //提取每一类数据中每一行的标识字段

    val appids: RDD[Array[String]] = logds(applog)
    val weblogids: RDD[Array[String]] = logds(weblog)
    val wxapplogids: RDD[Array[String]] = logds(wxapplog)
    //将所有log拼接成一个
    val ids: RDD[Array[String]] = appids.union(weblogids).union(wxapplogids)
    //构造图中的点集合
    val vertices: RDD[(Long, String)] = ids.flatMap(arr => {
      for (ele <- arr) yield (ele.hashCode.toLong, ele)
    })
    //构造图中的边集合 各种组合
    val edges: RDD[Edge[String]] = ids.flatMap(arr => {
      //双层for对数组中所有标识进行两两组合
      for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      //将每个组合进行类wordcount计数，计算边出现的数次，并过滤边小于2的情况
      .map(edge => (edge, 1))
      .reduceByKey(_ + _)
      .filter(tp => tp._2 > 2)
      .map(tp => tp._1)

    //构造图，并调用最小路径算法
    val graph: Graph[String, String] = Graph(vertices, edges)
    //存储的就是RDD(id, id值）
    val res_vertex: VertexRDD[VertexId] = graph.connectedComponents().vertices

    //再利用图计算的值作为新的日志gid保存到项目目录下
    res_vertex.toDF("gid_hc", "gid").write.parquet("data\\idmp\\2020-01-11\\")
    sparkSession.close()
  }

  //提取标识，因为它们的格式一样，所以不用处理不同，如果日志不同时记得要自己来处理
  private def logds(applog: Dataset[String]): RDD[Array[String]] = {
    applog.rdd.map(line => {
      val jsonObject: JSONObject = JSON.parseObject(line)
      val userObj: JSONObject = jsonObject.getJSONObject("user")
      val uid: String = userObj.getString("uid")
      val phoneObj: JSONObject = userObj.getJSONObject("phone")
      val imei: String = phoneObj.getString("imei")
      val mac: String = phoneObj.getString("mac")
      val imsi: String = phoneObj.getString("imsi")
      val androidId: String = phoneObj.getString("androidId")
      val deviceId: String = phoneObj.getString("deviceId")
      val uuid: String = phoneObj.getString("uuid")

      Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
    })
  }
}
